﻿using System;
using System.IO;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace PlayRx.ServerSide
{
    static class TestCopyFile_TaskWrapper
    {
        #region "copy as observable"

        private static IObservable<Unit> InternalCopyAsObservable(Stream inputStream, int readSize, Stream outputStream, int writeSize)
        {
            // ------------- prepare input
            IObservable<byte[]> inputSource = new ReadStreamObservable(inputStream, readSize);

            // ------------- prepare output
            Func<byte[], int, int, IObservable<Unit>> funcAsyncWrite = Observable.FromAsyncPattern<byte[], int, int>(
                outputStream.BeginWrite, outputStream.EndWrite);

            // ------------- chain together
            return from bytes in inputSource.MinimumBuffer(writeSize)
                   from writeResult in funcAsyncWrite(bytes, 0, bytes.Length)
                   select writeResult;
        }

        private static IObservable<Unit> CopyAsObservable(string oriFileName, int readSize, int writeSize)
        {
            Stream inputStream = new FileStream(oriFileName, FileMode.Open, FileAccess.Read,
                                                FileShare.Read, readSize, true);

            string cpyFileName = "copy-" + oriFileName;
            Stream outputStream = new FileStream(cpyFileName, FileMode.Create, FileAccess.Write, FileShare.None,
                                                 writeSize, true);

            return InternalCopyAsObservable(inputStream, readSize, outputStream, writeSize)
                .Finally(() =>
                {
                    inputStream.Close();
                    outputStream.Close();
                    Console.WriteLine("both input and output streams are closed.");
                });
        }

        private static void TestCopyAsObservable()
        {
            const int readSize = 1024;
            const int writeSize = readSize * 3;
            IObservable<Unit> asyncCopying = CopyAsObservable("Sample1344.PmuCapture", readSize, writeSize);

            // chekanote: the Observable generated by "Create" can automatically unsubscribe its observers when completed
            // so we can "dispose" occurs even before we hit ENTER
            int index = 0;
            using (asyncCopying.Subscribe(
                _ =>
                {
                    ++index;
                    Console.WriteLine("{0}-th writing completes.", index);
                },
                () => Console.WriteLine("*** copying is totally completed ***")))
            {
                Helper.Pause();
            }//using
        }
        #endregion

        #region "copy as task"

        private static Task CopyAsTask(string oriFileName, int readSize, int writeSize)
        {
            TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();

            IObservable<Unit> asyncCopying = CopyAsObservable(oriFileName, readSize, writeSize);

            asyncCopying.Subscribe(
                _ => { },
                tcs.SetException,
                () => tcs.SetResult(null));

            return tcs.Task;
        }

        private static void TestCopyAsTask()
        {
            const int readSize = 1024;
            const int writeSize = readSize * 4;
            Task copyTask = CopyAsTask("Sample1344.PmuCapture", readSize, writeSize);

            copyTask.ContinueWith(antecedent => Console.WriteLine("!!! copy finished. !!!"));

            Helper.Pause();
        }

        #endregion

        public static void TestMain()
        {
            // TestCopyAsObservable();
            TestCopyAsTask();
        }
    }
}
